package com.xiaojie.rabbitmq.tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @Description: mq事务模式保证消息可靠性
 * @author: xiaojie
 * @date: 2021.09.28
 */
public class TxProvider {
    //定义队列
    private static final String QUEUE_NAME = "myqueue";
    static Channel channel = null;
    static Connection connection = null;

    public static void main(String[] args) {
        try {
            System.out.println("生产者启动成功..");
            // 1.创建连接
            connection = MyConnection.getConnection();
            // 2.创建通道
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "测试事务机制保证消息发送可靠性。。。。";
            channel.txSelect(); //开启事务
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            //发生异常时，mq中并没有新的消息入队列
            //int i=1/0;
            //没有发生异常，提交事务
            channel.txCommit();
            System.out.println("生产者发送消息成功:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
            //发生异常则回滚事务
            try {
                if (channel != null) {
                    channel.txRollback();
                }
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }
    }
}
